Data ETL

The Yelp dataset is extracted using 7zip and loaded using pig

json_business = LOAD '/user/cloudera/yelp_academic_dataset_business.json' as (str:chararray);

json_business_2 = FOREACH json_business GENERATE REPLACE(str, '\\"neighborhoods\\"\\:\\s\\[\\],\\s','') AS str;

json_business_3 = FOREACH json_business_2 GENERATE  REPLACE(REPLACE(REGEX_EXTRACT(str, '\\"categories\\"\\:\\s\\[(.*?)\\]', 1), '\\"', ''),',','#') AS categories , REPLACE(str, '\\"categories\\"\\:\\s\\[(.*?)\\],', '') AS str;

json_business_4 = FOREACH json_business_3 GENERATE 
    REGEX_EXTRACT(str, '\\"business_id\\"\\:\\s\\"(.*?)\\"', 1) AS business_id, 
    REGEX_EXTRACT(str, '\\"name\\"\\:\\s\\"(.*?)\\"', 1) AS name, 
    categories, 
    REGEX_EXTRACT(str, '\\"review_count\\"\\:\\s(.*?),', 1) AS review_count, 
    REGEX_EXTRACT(str, '\\"stars\\"\\:\\s(.*?),', 1) AS stars,
    REGEX_EXTRACT(str, '\\"open\\"\\:\\s(.*?),', 1) AS open,
    REPLACE(REPLACE(REGEX_EXTRACT(str, '\\"full_address\\"\\:\\s\\"(.*?)\\"', 1),'\\\\n','*'),'\\\\r','*') AS full_address,
    REGEX_EXTRACT(str, '\\"city\\"\\:\\s\\"(.*?)\\"', 1) AS city,
    REGEX_EXTRACT(str, '\\"state\\"\\:\\s\\"(.*?)\\"', 1) AS state,
    REGEX_EXTRACT(str, '\\"longitude\\"\\:\\s(.*?),', 1) AS longitude,
    REGEX_EXTRACT(str, '\\"latitude\\"\\:\\s(.*?),', 1) AS latitude;

STORE json_business_4 INTO '/user/cloudera/yelp__business' USING PigStorage('\u0001');

At first i had some trouble with storing the business data using JSON, after some research i decided to flatten the JSON into a single charracter string and then parsed each column using regex. I decided to use hive for the analysis

reviews = 
  LOAD '/user/cloudera/yelp_academic_dataset_review.json'
  USING JsonLoader('votes:map[],user_id:chararray,review_id:chararray,stars:int,date:chararray,text:chararray,type:chararray,business_id:chararray');

tsv = 
  FOREACH reviews
  GENERATE
     (INT) votes#'funny', (INT) votes#'useful', (INT) votes#'cool', user_id, review_id, stars, REPLACE(REPLACE(text, 'n', ''), 't', ''), date, type, business_id;

STORE tsv INTO 'yelp_academic_dataset_review.tsv';```

I did not have the same problem with the review data, but since i decided to proceed with hive i converted the review data to tsv which will be easier to query using hive.

Both the business data and the review data was then uploaded to the hive metastore tables using the create table function in cloudera.

data load using create table feature in cloudera

data load using create table feature in cloudera

Question 1:

SELECT state, count(review_count) from business 
where state = "PA" OR state = "NC" OR state = "AZ" OR state = "NV" OR state = "WI"  or state = "IL" 
group by state;


SELECT categories, count(review_count) from business 
group by categories;

To summarize the review count for the US cities alone i have used States option instead of cities because the YELP dataset page has listed only 6 US cities but under the cities tab there are a lot of neighbouring cities to that of thos actually listed which would come under the urban region of a city.So, the State column was a better filter to group review count by US cities.

Output file - Question1a result

For the second part of the question , a straught forward select statement is used to summarize businesses by categories.

Output file - Question1b result

Question 2:

SELECT  categories,city,avg(stars) as cnt from business
group by categories,city
order by cnt desc;

For the second question the cities are order by the rank of each city within the different categories available

Output file - Question2 result

Question 3:

Select categories, avg(stars) as str from business
where latitude < 40.532540 and latitude > 40.37061262 and longitude > -80.037803 and longitude < -79.846916
group by categories;;

To determine the bounding box of 5 miles using googles distance measure feature, the longitude and latitude of the bounding box in the where statement are used to filter the businesses. Using select statement we can obtain the average rating and group by different types of businesses.

Output file - Question3 result

Question 4:

SELECT user_id, avg(stars),Count(review_id) as cnt from review
group by user_id
order by cnt desc
limit 10;

From the review data, the top 10 reviewers by count are filtered and their average star rating is displayed.

Output file - Question4 result

Question 5:

CREATE TABLE `default`.`top10` AS Select business_id ,stars, categories from business 
where latitude < 40.532540 and latitude > 40.37061262 and longitude > -80.037803 and longitude < -79.846916
and categories  like '%Food%'
order by stars desc
limit 10;

CREATE TABLE `default`.`top10_month` AS SELECT c.business_id, c.date, c.stars, o.business_id as id 
FROM reveiw_0 c JOIN top10 o 
ON (c.business_id = o.business_id)

CREATE TABLE `default`.`top10_month_result` AS SELECT business_id,month(date), avg(stars) from top10_month
group by business_id, month(date)

First the businesses are filtered by the above bounding box criteria and the required business under food with top rating are obtained and stored in table top10

Then, the top10 table is then joined with review data columns - date, stars using business_id from top10 table

Then Using SUBSTRING and group by statements the average of the stars for each month of the top 10 businesses is displayed.

Output file - Question5a result

CREATE TABLE `default`.`top10` AS Select business_id ,stars, categories from business 
where latitude < 40.532540 and latitude > 40.37061262 and longitude > -80.037803 and longitude < -79.846916
and categories  like '%Food%'
order by stars
limit 10;

CREATE TABLE `default`.`bottom10_month` AS SELECT c.business_id, c.date, c.stars, o.business_id as id 
FROM reveiw_0 c JOIN bottom10 o 
ON (c.business_id = o.business_id);

CREATE TABLE `default`.`bottom10_month_result` AS SELECT business_id,month(date), avg(stars) from bottom10_month
group by business_id, month(date);

the same process as above is followed for the bottom 10 businesses.

Output file - Question5b result

Extra points

Repeating the excercise in Spark

For this I will be using sparkR package with R to carry out the exercise

library(SparkR)
## 
## Attaching package: 'SparkR'
## The following objects are masked from 'package:stats':
## 
##     cov, filter, lag, na.omit, predict, sd, var
## The following objects are masked from 'package:base':
## 
##     colnames, colnames<-, intersect, rank, rbind, sample, subset,
##     summary, table, transform
Sys.setenv(SPARK_HOME="C:/spark-1.6.2-bin-hadoop2.6")

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

library("SparkR", lib.loc="C:/spark-1.6.2-bin-hadoop2.6/lib") # The use of \\ is fof windows environment.


sc=sparkR.init(master="local")
## Launching java with spark-submit command C:/spark-1.6.2-bin-hadoop2.6/bin/spark-submit.cmd   sparkr-shell C:\Users\VISHES~1\AppData\Local\Temp\RtmpO22YGV\backend_port10e426654409

Once we have loaded sparkR library and connected to spark we will have to load the files

sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
## Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context
sqlContext <- sparkRSQL.init(sc)

business <-read.df(sqlContext,"D:/test_project/yelp/yelp_academic_dataset_business.json", "json")


user<-read.df(sqlContext,"D:/test_project/yelp/yelp_academic_dataset_user.json", "json")


review<-read.df(sqlContext,"D:/test_project/yelp/yelp_academic_dataset_review.json", "json")

Question1:

head(summarize(groupBy(business, business$state), count = n(business$review_count)))
##   state count
## 1    NC  6162
## 2    AL     1
## 3    TX     2
## 4    NM     1
## 5    AR     1
## 6    NV 21233
head(summarize(groupBy(business, business$categories), count = n(business$review_count)))
##                                                                    categories
## 1                                                  Home Services, Contractors
## 2                                                 Lingerie, Shopping, Fashion
## 3                                    Hair Salons, Hair Removal, Beauty & Spas
## 4   Food, Beer, Wine & Spirits, Arts & Entertainment, Nightlife, Music Venues
## 5                Nightlife, Bars, American (Traditional), Diners, Restaurants
## 6 Speech Therapists, Occupational Therapy, Health & Medical, Physical Therapy
##   count
## 1   151
## 2    45
## 3    17
## 4     1
## 5     1
## 6     5

Question2:

registerTempTable(business, "business")

query2 <- sql(sqlContext, "SELECT categories,city,avg(stars) as cnt from business
group by categories,city
order by cnt desc")

head(query2)
##                                                                             categories
## 1 Local Services, Professional Services, Printing Services, Signmaking, Graphic Design
## 2   Arts & Entertainment, Shopping, Art Classes, Arts & Crafts, Paint & Sip, Education
## 3                                    Health & Medical, Physical Therapy, Chiropractors
## 4                                          Tobacco Shops, Food, Coffee & Tea, Shopping
## 5                                                    Massage Therapy, Health & Medical
## 6                                                               Barbers, Beauty & Spas
##         city cnt
## 1  Las Vegas   5
## 2     Anthem   5
## 3 Scottsdale   5
## 4  Las Vegas   5
## 5   Sun City   5
## 6   Bellevue   5

Question3:

query3<-sql(sqlContext, "Select categories, avg(stars) as str from business
where latitude < 40.532540 and latitude > 40.37061262 and longitude > -80.037803 and longitude < -79.846916
group by categories")

head (query3)
##                                                        categories  str
## 1                                     Lingerie, Shopping, Fashion 3.25
## 2                        Hair Salons, Hair Removal, Beauty & Spas 3.25
## 3 Food, Vegetarian, Coffee & Tea, Breakfast & Brunch, Restaurants 4.00
## 4                                           Professional Services 1.00
## 5                                  Peruvian, Burgers, Restaurants 3.75
## 6                           Egyptian, Middle Eastern, Restaurants 4.00

Question4:

registerTempTable(user, "user")

query4<-sql(sqlContext, "SELECT user_id, average_stars,review_count from user
order by review_count desc limit 10")

head(query4)
##                  user_id average_stars review_count
## 1 JLM36sYWmouJAZ2knzst7A          3.27        10320
## 2 1p3d__fuRRCDXfbS1Tq0wA          3.74         8529
## 3 3zBKfA8-_fJRagWSTMLVvg          3.36         5833
## 4 VhI6xyylcAxi0wOy2HOX3w          3.48         5648
## 5 22-6yC05pgWbLupHZTjQig          3.26         5050
## 6 pz97SxRe1Vk-5_K6EB9OSA          3.43         3953

Question5:

top10<- sql(sqlContext,"Select business_id ,stars, categories from business 
where latitude < 40.532540 and latitude > 40.37061262 and longitude > -80.037803 and longitude < -79.846916 and categories  like '%Food%'
order by stars desc
limit 10")

registerTempTable(top10, "top10")
registerTempTable(review, "review")

top10_month <-sql(sqlContext,"SELECT c.business_id, c.date, c.stars, o.business_id as id 
FROM review c JOIN top10 o 
ON (c.business_id = o.business_id)")

registerTempTable(top10_month, "top10_month")

top10_month_result <- sql(sqlContext,"SELECT business_id,month(date) as mnth, avg(stars) as strs from top10_month
group by business_id, month(date)")

head(top10_month_result)
##              business_id mnth     strs
## 1 C-8JfFK8O2vgukDN3nHt6g    9 5.000000
## 2 C-8JfFK8O2vgukDN3nHt6g   10 5.000000
## 3 C-8JfFK8O2vgukDN3nHt6g   12 5.000000
## 4 EpPfFCLETEnzAw324Y666A    1 5.000000
## 5 EpPfFCLETEnzAw324Y666A    2 4.500000
## 6 EpPfFCLETEnzAw324Y666A    4 4.666667
bottom10<- sql(sqlContext,"Select business_id ,stars, categories from business 
where latitude < 40.532540 and latitude > 40.37061262 and longitude > -80.037803 and longitude < -79.846916 and categories  like '%Food%'
order by stars
limit 10")

registerTempTable(bottom10, "bottom10")

bottom10_month <-sql(sqlContext,"SELECT c.business_id, c.date, c.stars, o.business_id as id 
FROM review c JOIN bottom10 o 
ON (c.business_id = o.business_id)")

registerTempTable(bottom10_month, "bottom10_month")

bottom10_month_result <- sql(sqlContext,"SELECT business_id,month(date) as mnth, avg(stars) as STARS from bottom10_month
group by business_id, month(date)")

head(bottom10_month_result)
##              business_id mnth strs
## 1 6C1Igw4BzRmg5Et8GSVfpA    6    1
## 2 9KsHPdF1-P_CiXnvugdQvQ    1    1
## 3 9KsHPdF1-P_CiXnvugdQvQ    4    1
## 4 9KsHPdF1-P_CiXnvugdQvQ    8    1
## 5 BbIh5NTizhV4Fq_mLmNkpg    8    1
## 6 CL3tZqbYT7B5zgewKCS6-Q    8    1

The main difference between Spark and hadoop is the the support for uploading and working with JSON files. Apart from that there was a significant improvement in querying the same SQL queries in spark when compared to spark.

Statistical analysis and Visualizing results with R

library('RJSONIO')
library('rpart')
library('party')
## Loading required package: grid
## 
## Attaching package: 'grid'
## The following object is masked from 'package:SparkR':
## 
##     explode
## Loading required package: mvtnorm
## Loading required package: modeltools
## Loading required package: stats4
## Loading required package: strucchange
## Loading required package: zoo
## 
## Attaching package: 'zoo'
## The following objects are masked from 'package:base':
## 
##     as.Date, as.Date.numeric
## Loading required package: sandwich
## 
## Attaching package: 'party'
## The following object is masked from 'package:SparkR':
## 
##     where
library('partykit')
## 
## Attaching package: 'partykit'
## The following objects are masked from 'package:party':
## 
##     cforest, ctree, ctree_control, edge_simple, mob, mob_control,
##     node_barplot, node_bivplot, node_boxplot, node_inner,
##     node_surv, node_terminal
library('caret')
## Loading required package: lattice
## Loading required package: ggplot2
library('tree')
library(leaflet)
query1 <- sql(sqlContext, "SELECT state, count(review_count) from business 
where state = 'PA' OR state = 'NC' OR state = 'AZ' OR state = 'NV' OR state = 'WI'  or state = 'IL' 
group by state")

q1 <- as.data.frame(head(query1))

barplot(q1$`_c1`, main="Review Count US cities", horiz=FALSE,
        names.arg=c(q1$state))

business_data <- as.data.frame(t(sapply(readLines("D:/test_project/yelp/yelp_academic_dataset_business.json"), fromJSON)))
phoenix <- business_data[business_data$state == "AZ",]
Pittsburgh <- business_data[business_data$state == "PA",]
charolette <- business_data[business_data$state == "NC",]
urbana <- business_data[business_data$state == "IL",]
madison <- business_data[business_data$state == "WI",]
las_vegas <- business_data[business_data$state == "NV",]

paste("pittsburgh", sum(as.numeric(Pittsburgh$review_count)))
## [1] "pittsburgh 101046"
n <- leaflet()%>%
addTiles() %>%
addMarkers(lng=-79.995886, lat=40.440625, popup=paste("pittsburgh", sum(as.numeric(Pittsburgh$review_count))))%>%
  
addMarkers(lng=-80.84, lat=35.23, popup=paste("Charolette", sum(as.numeric(charolette$review_count))))%>%

  addMarkers(lng=-88.20727, lat=40.110588, popup=paste("Urbana", sum(as.numeric(urbana$review_count))))%>%

  addMarkers(lng=-112.0740400, lat=33.4483800   , popup=paste("Phoenix", sum(as.numeric(phoenix$review_count))))%>%

  addMarkers(lng=-115.172813, lat=36.114647, popup=paste("Las Vegas", sum(as.numeric(las_vegas$review_count))))%>%

  addMarkers(lng=-89.38, lat=43.07, popup=paste("Madison", sum(as.numeric(madison$review_count))))
n

By clicking on the markers we can view a summary of the total number of reviews in each US city.

Top_10 <- business_data[business_data$state == "PA" & business_data$latitude < 40.532540 & business_data$latitude > 40.37061262 & business_data$longitude > -80.037803 & business_data$longitude < -79.846916,]

Top_10$review_count<-as.numeric(Top_10$review_count)
Top_10$latitude<-as.numeric(Top_10$latitude)
Top_10$longitude<-as.numeric(Top_10$longitude)

m <- leaflet(Top_10)%>%
  addCircles(lng = ~longitude, lat = ~latitude, weight = 1,
    radius = ~review_count*2, popup = ~name
  )%>%
  addTiles() %>%  # Add default OpenStreetMap map tiles
  addMarkers(lng=-79.9428294, lat=40.4411801, popup="Carnegie Mellon University")%>%
  
  addRectangles(
    lng1=-80.037803, lat1=40.532540,
    lng2=-79.846916, lat2= 40.37061262,
    fillColor = "transparent"
  )

m 

Visualizing the bounding box using R and plotting the businesses by rating_count. Here the businesses within the boundig box are plotted on the map.

resoruces & references -

http://gethue.com/hadoop-tutorials-ii-1-prepare-the-data-for-analysis/ http://hortonworks.com/blog/hive-cheat-sheet-for-sql-users/ https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF http://stackoverflow.com/questions/18312090/pig-string-extraction-using-regex https://pig.apache.org/docs/r0.8.1/piglatin_ref2.html https://spark.apache.org/docs/latest/sparkr.html